---
title: Channel 和 Select 深入
---

Channel 和 `select` 語句是 Go 並發模型的核心。雖然前面的模組涵蓋了基礎知識，但本模組探索了使 Go 並發模型如此強大和富有表現力的高級模式和技巧。

## Channel 方向性

Go channel 可以是定向的，允許你指定 channel 是用於發送、接收還是兩者兼有。這提供了編譯時安全性，並使程式碼意圖更清晰。

<UniversalEditor title="Channel 方向性" compare={true}>
```javascript !! js
// JavaScript: 沒有直接等價物，但可以用不同介面模擬
class ReadOnlyChannel {
  constructor(channel) {
    this.channel = channel;
  }
  
  receive() {
    return this.channel.receive();
  }
}

class WriteOnlyChannel {
  constructor(channel) {
    this.channel = channel;
  }
  
  send(data) {
    return this.channel.send(data);
  }
}

// 使用
const baseChannel = new EventChannel();
const readOnly = new ReadOnlyChannel(baseChannel);
const writeOnly = new WriteOnlyChannel(baseChannel);
```

```go !! go
// Go: Channel 方向性
package main

import (
	"fmt"
	"time"
)

// sendOnly: 只能向 channel 發送
func sender(ch chan<- string) {
	ch <- "來自發送者的問候"
	close(ch) // 可以關閉 channel
}

// receiveOnly: 只能從 channel 接收
func receiver(ch <-chan string) {
	msg := <-ch
	fmt.Println("收到:", msg)
	// 不能發送: ch <- "test" // 編譯錯誤
	// 不能關閉: close(ch) // 編譯錯誤
}

// bidirectional: 既可以發送也可以接收
func processor(ch chan string) {
	msg := <-ch // 可以接收
	ch <- "已處理: " + msg // 可以發送
}

func main() {
	// 雙向 channel
	ch := make(chan string)
	
	// 傳遞給具有特定方向性的函數
	go sender(ch)
	receiver(ch)
	
	// 創建一個新的 channel 用於處理
	procCh := make(chan string)
	go processor(procCh)
	procCh <- "測試訊息"
	result := <-procCh
	fmt.Println("結果:", result)
}
```
</UniversalEditor>

## 高級 Select 模式

### 帶 Default 的 Select

`select` 中的 `default` 分支允許非阻塞的 channel 操作：

<UniversalEditor title="帶 Default 的 Select" compare={true}>
```javascript !! js
// JavaScript: 模擬非阻塞操作
class NonBlockingChannel {
  constructor() {
    this.buffer = [];
    this.listeners = [];
  }
  
  send(data) {
    if (this.listeners.length > 0) {
      const listener = this.listeners.shift();
      listener(data);
      return true;
    } else {
      this.buffer.push(data);
      return false;
    }
  }
  
  receive() {
    if (this.buffer.length > 0) {
      return this.buffer.shift();
    } else {
      return null; // 非阻塞
    }
  }
  
  receiveAsync(callback) {
    if (this.buffer.length > 0) {
      callback(this.buffer.shift());
    } else {
      this.listeners.push(callback);
    }
  }
}

// 使用
const ch = new NonBlockingChannel();

// 非阻塞發送
const sent = ch.send("hello");
console.log("發送:", sent);

// 非阻塞接收
const received = ch.receive();
console.log("接收:", received);
```

```go !! go
// Go: 使用 default 進行非阻塞操作
package main

import (
	"fmt"
	"time"
)

func nonBlockingSend(ch chan<- string, value string) bool {
	select {
	case ch <- value:
		return true // 成功發送
	default:
		return false // Channel 滿了或沒有接收者
	}
}

func nonBlockingReceive(ch <-chan string) (string, bool) {
	select {
	case value := <-ch:
		return value, true // 成功接收
	default:
		return "", false // Channel 為空或沒有發送者
	}
}

func main() {
	// 緩衝 channel
	ch := make(chan string, 2)
	
	// 非阻塞發送
	if sent := nonBlockingSend(ch, "first"); sent {
		fmt.Println("成功發送 first")
	}
	
	if sent := nonBlockingSend(ch, "second"); sent {
		fmt.Println("成功發送 second")
	}
	
	// 嘗試向滿的 channel 發送（非阻塞）
	if sent := nonBlockingSend(ch, "third"); !sent {
		fmt.Println("發送 third 失敗 - channel 滿了")
	}
	
	// 非阻塞接收
	if value, received := nonBlockingReceive(ch); received {
		fmt.Println("收到:", value)
	}
	
	// 嘗試從空的 channel 接收（非阻塞）
	if value, received := nonBlockingReceive(ch); !received {
		fmt.Println("沒有可用資料")
	} else {
		fmt.Println("收到:", value)
	}
}
```
</UniversalEditor>

### 帶超時的 Select

使用 `select` 和 `time.After` 提供超時功能：

<UniversalEditor title="帶超時的 Select" compare={true}>
```javascript !! js
// JavaScript: 帶超時的 Promise
function withTimeout(promise, timeoutMs) {
  const timeoutPromise = new Promise((_, reject) => {
    setTimeout(() => reject(new Error('Timeout')), timeoutMs);
  });
  
  return Promise.race([promise, timeoutPromise]);
}

// 使用
async function fetchWithTimeout(url, timeoutMs) {
  try {
    const response = await withTimeout(fetch(url), timeoutMs);
    return await response.text();
  } catch (error) {
    if (error.message === 'Timeout') {
      console.log('請求超時');
    } else {
      console.log('請求失敗:', error.message);
    }
  }
}

// 模擬異步操作
const slowOperation = new Promise(resolve => {
  setTimeout(() => resolve('操作完成'), 3000);
});

withTimeout(slowOperation, 1000)
  .then(result => console.log(result))
  .catch(error => console.log('錯誤:', error.message));
```

```go !! go
// Go: 帶超時的 Select
package main

import (
	"fmt"
	"time"
)

func operationWithTimeout(operation func() string, timeout time.Duration) (string, error) {
	// 為結果創建一個 channel
	resultCh := make(chan string, 1)
	
	// 在 goroutine 中運行操作
	go func() {
		result := operation()
		resultCh <- result
	}()
	
	// 等待結果或超時
	select {
	case result := <-resultCh:
		return result, nil
	case <-time.After(timeout):
		return "", fmt.Errorf("操作在 %v 後超時", timeout)
	}
}

func slowOperation() string {
	time.Sleep(3 * time.Second)
	return "操作完成"
}

func main() {
	// 嘗試短超時
	result, err := operationWithTimeout(slowOperation, 1*time.Second)
	if err != nil {
		fmt.Println("錯誤:", err)
	} else {
		fmt.Println("結果:", result)
	}
	
	// 嘗試長超時
	result, err = operationWithTimeout(slowOperation, 5*time.Second)
	if err != nil {
		fmt.Println("錯誤:", err)
	} else {
		fmt.Println("結果:", result)
	}
}
```
</UniversalEditor>

## Channel 模式

### Fan-Out 模式

Fan-out 模式將工作從一個 channel 分發到多個工作線程：

<UniversalEditor title="Fan-Out 模式" compare={true}>
```javascript !! js
// JavaScript: 使用 Promise.all 的 Fan-out
class FanOut {
  constructor(workerCount) {
    this.workerCount = workerCount;
  }
  
  async process(tasks) {
    const workers = [];
    
    // 創建工作線程
    for (let i = 0; i < this.workerCount; i++) {
      workers.push(this.createWorker(i));
    }
    
    // 在工作線程間分發任務
    const promises = [];
    for (let i = 0; i < tasks.length; i++) {
      const workerIndex = i % this.workerCount;
      promises.push(workers[workerIndex](tasks[i]));
    }
    
    return Promise.all(promises);
  }
  
  createWorker(id) {
    return async (task) => {
      // 模擬工作
      await new Promise(resolve => setTimeout(resolve, Math.random() * 1000));
      return `工作線程 ${id} 處理了: ${task}`;
    };
  }
}

// 使用
const fanOut = new FanOut(3);
const tasks = Array.from({length: 10}, (_, i) => `任務 ${i}`);
fanOut.process(tasks).then(results => {
  console.log("所有任務完成:", results);
});
```

```go !! go
// Go: Fan-out 模式
package main

import (
	"fmt"
	"math/rand"
	"sync"
	"time"
)

func fanOut(input <-chan string, workerCount int) []<-chan string {
	outputs := make([]chan string, workerCount)
	
	// 創建輸出 channels
	for i := 0; i < workerCount; i++ {
		outputs[i] = make(chan string)
	}
	
	// 啟動工作線程
	var wg sync.WaitGroup
	for i := 0; i < workerCount; i++ {
		wg.Add(1)
		go worker(i, input, outputs[i], &wg)
	}
	
	// 當所有工作線程完成時關閉輸出 channels
	go func() {
		wg.Wait()
		for _, output := range outputs {
			close(output)
		}
	}()
	
	// 轉換為只讀 channels
	result := make([]<-chan string, workerCount)
	for i, output := range outputs {
		result[i] = output
	}
	
	return result
}

func worker(id int, input <-chan string, output chan<- string, wg *sync.WaitGroup) {
	defer wg.Done()
	
	for task := range input {
		// 模擬工作
		time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
		output <- fmt.Sprintf("工作線程 %d 處理了: %s", id, task)
	}
}

func main() {
	// 創建輸入 channel
	input := make(chan string)
	
	// 啟動 fan-out
	outputs := fanOut(input, 3)
	
	// 發送任務
	go func() {
		defer close(input)
		for i := 0; i < 10; i++ {
			input <- fmt.Sprintf("任務 %d", i)
		}
	}()
	
	// 從所有工作線程收集結果
	var results []string
	for _, output := range outputs {
		for result := range output {
			results = append(results, result)
		}
	}
	
	fmt.Println("所有任務完成:", results)
}
```
</UniversalEditor>

### Fan-In 模式

Fan-in 模式將多個 channels 合併為一個：

<UniversalEditor title="Fan-In 模式" compare={true}>
```javascript !! js
// JavaScript: 使用 Promise.race 或 Promise.all 的 Fan-in
class FanIn {
  constructor() {
    this.output = [];
  }
  
  async collect(promises) {
    // 收集所有完成的結果
    const results = [];
    
    // 為每個輸入創建一個 promise
    const inputPromises = promises.map((promise, index) => 
      promise.then(result => ({ index, result }))
    );
    
    // 收集到達的結果
    while (inputPromises.length > 0) {
      const { index, result } = await Promise.race(inputPromises);
      results.push(result);
      inputPromises.splice(index, 1);
    }
    
    return results;
  }
}

// 使用
const fanIn = new FanIn();
const promises = [
  new Promise(resolve => setTimeout(() => resolve("結果 1"), 1000)),
  new Promise(resolve => setTimeout(() => resolve("結果 2"), 500)),
  new Promise(resolve => setTimeout(() => resolve("結果 3"), 1500))
];

fanIn.collect(promises).then(results => {
  console.log("所有結果:", results);
});
```

```go !! go
// Go: Fan-in 模式
package main

import (
	"fmt"
	"math/rand"
	"sync"
	"time"
)

func fanIn(channels ...<-chan string) <-chan string {
	output := make(chan string)
	var wg sync.WaitGroup
	
	// 為每個輸入 channel 啟動一個 goroutine
	for _, ch := range channels {
		wg.Add(1)
		go func(input <-chan string) {
			defer wg.Done()
			for value := range input {
				output <- value
			}
		}(ch)
	}
	
	// 當所有輸入 channels 關閉時關閉輸出
	go func() {
		wg.Wait()
		close(output)
	}()
	
	return output
}

func producer(id int) <-chan string {
	output := make(chan string)
	
	go func() {
		defer close(output)
		for i := 0; i < 3; i++ {
			time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
			output <- fmt.Sprintf("生產者 %d: 項目 %d", id, i)
		}
	}()
	
	return output
}

func main() {
	// 創建多個生產者
	producer1 := producer(1)
	producer2 := producer(2)
	producer3 := producer(3)
	
	// Fan-in 所有生產者
	combined := fanIn(producer1, producer2, producer3)
	
	// 收集所有結果
	var results []string
	for result := range combined {
		results = append(results, result)
		fmt.Println("收到:", result)
	}
	
	fmt.Println("收集了所有結果:", len(results))
}
```
</UniversalEditor>

## Channel 組合

### 帶錯誤處理的管道

構建具有錯誤處理的健壯管道：

<UniversalEditor title="帶錯誤處理的管道" compare={true}>
```javascript !! js
// JavaScript: 帶錯誤處理的管道
class Pipeline {
  constructor() {
    this.stages = [];
  }
  
  addStage(stage) {
    this.stages.push(stage);
    return this;
  }
  
  async process(input) {
    let current = input;
    
    for (const stage of this.stages) {
      try {
        current = await stage(current);
      } catch (error) {
        console.error('管道階段失敗:', error);
        throw error;
      }
    }
    
    return current;
  }
}

// 使用
const pipeline = new Pipeline()
  .addStage(async (data) => {
    if (data.some(x => x < 0)) {
      throw new Error('不允許負數');
    }
    return data.map(x => x * 2);
  })
  .addStage(async (data) => {
    return data.filter(x => x > 5);
  })
  .addStage(async (data) => {
    return data.reduce((sum, x) => sum + x, 0);
  });

// 測試有效資料
pipeline.process([1, 2, 3, 4, 5])
  .then(result => console.log('結果:', result))
  .catch(error => console.log('錯誤:', error.message));

// 測試無效資料
pipeline.process([1, -2, 3, 4, 5])
  .then(result => console.log('結果:', result))
  .catch(error => console.log('錯誤:', error.message));
```

```go !! go
// Go: 帶錯誤處理的管道
package main

import (
	"fmt"
	"time"
)

type PipelineStage func(<-chan int) (<-chan int, <-chan error)

func stage1(input <-chan int) (<-chan int, <-chan error) {
	output := make(chan int)
	errors := make(chan error)
	
	go func() {
		defer close(output)
		defer close(errors)
		
		for value := range input {
			if value < 0 {
				errors <- fmt.Errorf("不允許負數: %d", value)
				continue
			}
			output <- value * 2
		}
	}()
	
	return output, errors
}

func stage2(input <-chan int) (<-chan int, <-chan error) {
	output := make(chan int)
	errors := make(chan error)
	
	go func() {
		defer close(output)
		defer close(errors)
		
		for value := range input {
			if value > 5 {
				output <- value
			}
		}
	}()
	
	return output, errors
}

func stage3(input <-chan int) (<-chan int, <-chan error) {
	output := make(chan int)
	errors := make(chan error)
	
	go func() {
		defer close(output)
		defer close(errors)
		
		sum := 0
		count := 0
		for value := range input {
			sum += value
			count++
		}
		
		if count > 0 {
			output <- sum
		}
	}()
	
	return output, errors
}

func main() {
	// 創建輸入 channel
	input := make(chan int)
	
	// 構建管道
	stage1Out, stage1Err := stage1(input)
	stage2Out, stage2Err := stage2(stage1Out)
	stage3Out, stage3Err := stage3(stage2Out)
	
	// 發送測試資料
	go func() {
		defer close(input)
		testData := []int{1, 2, 3, 4, 5}
		for _, value := range testData {
			input <- value
		}
	}()
	
	// 收集結果和錯誤
	var results []int
	var errors []error
	
	// 從最終階段收集
	for result := range stage3Out {
		results = append(results, result)
	}
	
	// 從所有階段收集錯誤
	for err := range stage1Err {
		errors = append(errors, err)
	}
	for err := range stage2Err {
		errors = append(errors, err)
	}
	for err := range stage3Err {
		errors = append(errors, err)
	}
	
	fmt.Println("結果:", results)
	if len(errors) > 0 {
		fmt.Println("錯誤:", errors)
	}
}
```
</UniversalEditor>

## Context 和取消

使用 context 和 channels 進行取消：

<UniversalEditor title="Context 和取消" compare={true}>
```javascript !! js
// JavaScript: 使用 AbortController 進行取消
class CancellablePipeline {
  constructor() {
    this.controller = new AbortController();
  }
  
  async process(data, signal) {
    const results = [];
    
    for (const item of data) {
      // 檢查是否已取消
      if (signal.aborted) {
        throw new Error('管道已取消');
      }
      
      // 模擬工作
      await new Promise(resolve => setTimeout(resolve, 100));
      results.push(item * 2);
    }
    
    return results;
  }
  
  cancel() {
    this.controller.abort();
  }
}

// 使用
const pipeline = new CancellablePipeline();

// 開始處理
const processPromise = pipeline.process([1, 2, 3, 4, 5], pipeline.controller.signal);

// 200ms 後取消
setTimeout(() => {
  pipeline.cancel();
}, 200);

processPromise
  .then(results => console.log('結果:', results))
  .catch(error => console.log('錯誤:', error.message));
```

```go !! go
// Go: 使用 context 和 channels 進行取消
package main

import (
	"context"
	"fmt"
	"time"
)

func processWithContext(ctx context.Context, input <-chan int) <-chan int {
	output := make(chan int)
	
	go func() {
		defer close(output)
		
		for {
			select {
			case value := <-input:
				// 模擬工作
				time.Sleep(100 * time.Millisecond)
				output <- value * 2
			case <-ctx.Done():
				fmt.Println("處理已取消:", ctx.Err())
				return
			}
		}
	}()
	
	return output
}

func main() {
	// 創建帶超時的 context
	ctx, cancel := context.WithTimeout(context.Background(), 300*time.Millisecond)
	defer cancel()
	
	// 創建輸入 channel
	input := make(chan int)
	
	// 開始處理
	output := processWithContext(ctx, input)
	
	// 發送資料
	go func() {
		defer close(input)
		for i := 1; i <= 10; i++ {
			select {
			case input <- i:
				fmt.Printf("發送: %d\n", i)
			case <-ctx.Done():
				fmt.Println("發送已取消")
				return
			}
		}
	}()
	
	// 收集結果
	var results []int
	for {
		select {
		case result := <-output:
			results = append(results, result)
			fmt.Printf("收到: %d\n", result)
		case <-ctx.Done():
			fmt.Println("收集已取消")
			goto done
		}
	}
	
done:
	fmt.Printf("收集了 %d 個結果\n", len(results))
}
```
</UniversalEditor>

## Channel 最佳實踐

### 1. 始終由發送者關閉 Channel

<UniversalEditor title="Channel 關閉最佳實踐" compare={true}>
```javascript !! js
// JavaScript: 資源清理模式
class ResourceManager {
  constructor() {
    this.resources = new Set();
    this.closed = false;
  }
  
  addResource(resource) {
    if (this.closed) {
      throw new Error('管理器已關閉');
    }
    this.resources.add(resource);
  }
  
  close() {
    if (this.closed) return;
    
    this.closed = true;
    // 清理所有資源
    for (const resource of this.resources) {
      resource.cleanup();
    }
    this.resources.clear();
  }
  
  isClosed() {
    return this.closed;
  }
}
```

```go !! go
// Go: Channel 關閉最佳實踐
package main

import (
	"fmt"
	"sync"
)

type SafeChannel struct {
	ch     chan string
	closed bool
	mutex  sync.RWMutex
}

func NewSafeChannel(bufferSize int) *SafeChannel {
	return &SafeChannel{
		ch: make(chan string, bufferSize),
	}
}

func (sc *SafeChannel) Send(value string) error {
	sc.mutex.RLock()
	defer sc.mutex.RUnlock()
	
	if sc.closed {
		return fmt.Errorf("channel 已關閉")
	}
	
	sc.ch <- value
	return nil
}

func (sc *SafeChannel) Receive() (string, bool) {
	value, ok := <-sc.ch
	return value, ok
}

func (sc *SafeChannel) Close() {
	sc.mutex.Lock()
	defer sc.mutex.Unlock()
	
	if !sc.closed {
		sc.closed = true
		close(sc.ch)
	}
}

func (sc *SafeChannel) IsClosed() bool {
	sc.mutex.RLock()
	defer sc.mutex.RUnlock()
	return sc.closed
}

func main() {
	ch := NewSafeChannel(5)
	
	// 發送一些資料
	ch.Send("Hello")
	ch.Send("World")
	
	// 關閉 channel
	ch.Close()
	
	// 嘗試在關閉後發送
	if err := ch.Send("Test"); err != nil {
		fmt.Println("錯誤:", err)
	}
	
	// 接收剩餘資料
	for {
		if value, ok := ch.Receive(); ok {
			fmt.Println("收到:", value)
		} else {
			break
		}
	}
}
```
</UniversalEditor>

### 2. 避免 Channel 洩漏

<UniversalEditor title="防止 Channel 洩漏" compare={true}>
```javascript !! js
// JavaScript: 防止資源洩漏
class ChannelManager {
  constructor() {
    this.channels = new Set();
  }
  
  createChannel() {
    const channel = new EventChannel();
    this.channels.add(channel);
    return channel;
  }
  
  closeAll() {
    for (const channel of this.channels) {
      channel.close();
    }
    this.channels.clear();
  }
  
  cleanup() {
    this.closeAll();
  }
}
```

```go !! go
// Go: 防止 channel 洩漏
package main

import (
	"fmt"
	"sync"
	"time"
)

type ChannelManager struct {
	channels map[string]chan string
	mutex    sync.RWMutex
}

func NewChannelManager() *ChannelManager {
	return &ChannelManager{
		channels: make(map[string]chan string),
	}
}

func (cm *ChannelManager) CreateChannel(id string) chan string {
	cm.mutex.Lock()
	defer cm.mutex.Unlock()
	
	ch := make(chan string, 10)
	cm.channels[id] = ch
	return ch
}

func (cm *ChannelManager) CloseChannel(id string) {
	cm.mutex.Lock()
	defer cm.mutex.Unlock()
	
	if ch, exists := cm.channels[id]; exists {
		close(ch)
		delete(cm.channels, id)
	}
}

func (cm *ChannelManager) CloseAll() {
	cm.mutex.Lock()
	defer cm.mutex.Unlock()
	
	for id, ch := range cm.channels {
		close(ch)
		delete(cm.channels, id)
	}
}

func (cm *ChannelManager) GetChannelCount() int {
	cm.mutex.RLock()
	defer cm.mutex.RUnlock()
	return len(cm.channels)
}

func main() {
	manager := NewChannelManager()
	
	// 創建 channels
	ch1 := manager.CreateChannel("worker1")
	ch2 := manager.CreateChannel("worker2")
	
	fmt.Printf("活躍 channels: %d\n", manager.GetChannelCount())
	
	// 使用 channels
	go func() {
		ch1 <- "來自 worker1 的問候"
		time.Sleep(100 * time.Millisecond)
		manager.CloseChannel("worker1")
	}()
	
	go func() {
		ch2 <- "來自 worker2 的問候"
		time.Sleep(100 * time.Millisecond)
		manager.CloseChannel("worker2")
	}()
	
	// 接收訊息
	msg1 := <-ch1
	msg2 := <-ch2
	
	fmt.Println("收到:", msg1, msg2)
	fmt.Printf("活躍 channels: %d\n", manager.GetChannelCount())
	
	// 清理
	manager.CloseAll()
	fmt.Printf("清理後活躍 channels: %d\n", manager.GetChannelCount())
}
```
</UniversalEditor>

---

### 練習題:
1. 只發送 channel 和只接收 channel 有什麼區別？
2. `select` 中的 `default` 分支如何實現非阻塞操作？
3. 解釋 fan-out 和 fan-in 模式以及何時使用每種模式。
4. 如何使用 channels 和 `select` 實現超時功能？
5. 關閉 channels 和防止洩漏有哪些最佳實踐？

### 專案想法:
創建一個使用 channels 將傳入請求分發到多個工作 goroutine 的負載均衡器。負載均衡器應該實現健康檢查、優雅關閉和請求超時處理，使用本模組中學到的模式。
